//
// Copyright (c) 2023-2024 Ivica Siladic, Bruno Iljazovic, Korina Simicevic
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef ASYNC_MQTT5_ASSEMBLE_OP_HPP
#define ASYNC_MQTT5_ASSEMBLE_OP_HPP

#include <chrono>
#include <cstdint>
#include <string>
#include <utility>

#include <boost/asio/append.hpp>
#include <boost/asio/associated_allocator.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/completion_condition.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/prepend.hpp>
#include <boost/system/error_code.hpp>

#include <async_mqtt5/error.hpp>

#include <async_mqtt5/detail/control_packet.hpp>
#include <async_mqtt5/detail/internal_types.hpp>

#include <async_mqtt5/impl/codecs/message_decoders.hpp>

namespace async_mqtt5::detail {

namespace asio = boost::asio;

class data_span : private std::pair<byte_citer, byte_citer> {
	using base = std::pair<byte_citer, byte_citer>;
public:
	using base::base;

	auto first() const {
		return base::first;
	}
	auto last() const {
		return base::second;
	}
	void expand_suffix(size_t num_chars) {
		base::second += num_chars;
	}
	void remove_prefix(size_t num_chars) {
		base::first += num_chars;
	}
	size_t size() const {
		return std::distance(base::first, base::second);
	}
};


template <typename ClientService, typename Handler>
class assemble_op {
	using client_service = ClientService;
	using handler_type = Handler;

	struct on_read {};

	static constexpr uint32_t max_recv_size = 65'536;

	client_service& _svc;
	handler_type _handler;

	std::string& _read_buff;
	data_span& _data_span;

public:
	assemble_op(
		client_service& svc, handler_type&& handler,
		std::string& read_buff, data_span& active_span
	) :
		_svc(svc),
		_handler(std::move(handler)),
		_read_buff(read_buff), _data_span(active_span)
	{}

	assemble_op(assemble_op&&) noexcept = default;
	assemble_op(const assemble_op&) = delete;

	assemble_op& operator=(assemble_op&&) noexcept = default;
	assemble_op& operator=(const assemble_op&) = delete;

	using allocator_type = asio::associated_allocator_t<handler_type>;
	allocator_type get_allocator() const noexcept {
		return asio::get_associated_allocator(_handler);
	}

	using executor_type = typename client_service::executor_type;
	executor_type get_executor() const noexcept {
		return _svc.get_executor();
	}

	template <typename CompletionCondition>
	void perform(CompletionCondition cc) {
		_read_buff.erase(
			_read_buff.cbegin(), _data_span.first()
		);
		_read_buff.resize(
			_svc.connect_property(prop::maximum_packet_size).value_or(max_recv_size)
		);
		_data_span = {
			_read_buff.cbegin(),
			_read_buff.cbegin() + _data_span.size()
		};

		if (cc(error_code {}, 0) == 0 && _data_span.size()) {
			return asio::post(
				asio::prepend(
					std::move(*this), on_read {}, error_code {},
					0, std::move(cc)
				)
			);
		}

		// Must be evaluated before this is moved
		auto store_begin = _read_buff.data() + _data_span.size();
		auto store_size = std::distance(_data_span.last(), _read_buff.cend());

		_svc._stream.async_read_some(
			asio::buffer(store_begin, store_size), compute_read_timeout(),
			asio::prepend(
				asio::append(std::move(*this), std::move(cc)),
				on_read {}
			)
		);
	}

	template <typename CompletionCondition>
	void operator()(
		on_read, error_code ec, size_t bytes_read,
		CompletionCondition cc
	) {
		if (ec == asio::error::try_again) {
			_svc.update_session_state();
			_svc._async_sender.resend();
			_data_span = { _read_buff.cend(), _read_buff.cend() };
			return perform(std::move(cc));
		}

		if (ec)
			return complete(ec, 0, {}, {});

		_data_span.expand_suffix(bytes_read);
		assert(_data_span.size());

		auto control_byte = uint8_t(*_data_span.first());

		if ((control_byte & 0b11110000) == 0)
			// close the connection, cancel
			return complete(client::error::malformed_packet, 0, {}, {});

		auto first = _data_span.first() + 1;
		auto varlen = decoders::type_parse(
			first, _data_span.last(), decoders::basic::varint_
		);

		if (!varlen) {
			if (_data_span.size() < 5)
				return perform(asio::transfer_at_least(1));
			return complete(client::error::malformed_packet, 0, {}, {});
		}

		auto recv_size = _svc.connect_property(prop::maximum_packet_size)
			.value_or(max_recv_size);
		if (static_cast<uint32_t>(*varlen) > recv_size - std::distance(_data_span.first(), first))
			return complete(client::error::malformed_packet, 0, {}, {});

		if (std::distance(first, _data_span.last()) < *varlen)
			return perform(asio::transfer_at_least(1));

		_data_span.remove_prefix(
			std::distance(_data_span.first(), first) + *varlen
		);

		dispatch(control_byte, first, first + *varlen);
	}

private:
	duration compute_read_timeout() const {
		auto negotiated_ka = _svc.negotiated_keep_alive();
		return negotiated_ka ?
			std::chrono::milliseconds(3 * negotiated_ka * 1000 / 2) :
			duration(std::numeric_limits<duration::rep>::max());
	}

	static bool valid_header(uint8_t control_byte) {
		auto code = control_code_e(control_byte & 0b11110000);

		if (code == control_code_e::publish)
			return true;

		auto res = control_byte & 0b00001111;
		if (code == control_code_e::pubrel)
			return res == 0b00000010;
		return res == 0b00000000;
	}

	void dispatch(
		uint8_t control_byte, byte_citer first, byte_citer last
	) {
		using namespace decoders;

		if (!valid_header(control_byte))
			return complete(client::error::malformed_packet, 0, {}, {});

		auto code = control_code_e(control_byte & 0b11110000);

		if (code == control_code_e::pingresp)
			return perform(asio::transfer_at_least(0));

		bool is_reply = code != control_code_e::publish &&
			code != control_code_e::auth &&
			code != control_code_e::disconnect;

		if (is_reply) {
			auto packet_id = decoders::decode_packet_id(first).value();
			_svc._replies.dispatch(error_code {}, code, packet_id, first, last);
			return perform(asio::transfer_at_least(0));
		}

		complete(error_code {}, control_byte, first, last);
	}

	void complete(
		error_code ec, uint8_t control_code,
		byte_citer first, byte_citer last
	) {
		if (ec)
			_data_span = { _read_buff.cend(), _read_buff.cend() };
		std::move(_handler)(ec, control_code, first, last);
	}
};

} // end namespace async_mqtt5::detail

#endif // !ASYNC_MQTT5_ASSEMBLE_OP_HPP
